package reactor.mytest;

 
import java.util.function.Consumer;
import java.util.function.Function;

import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;
 

public class TestSchedulers {
	public static void main(String[] args) throws InterruptedException {
	    Mono.create(new Consumer<MonoSink<Object>>() {
	        @Override
	        public void accept(MonoSink<Object> objectMonoSink) {
	            System.out.println("1:" + Thread.currentThread().toString());
	            objectMonoSink.success("test");
	        }
	    })
	            //上游调度线程，只生效一次，后面的都忽略掉
	            .subscribeOn(Schedulers.newSingle("Test11")).subscribeOn(Schedulers.newSingle("Test12")).subscribeOn(Schedulers.newSingle("Test13"))
	            //下游调度线程，可以调用多次，在每个方法调用前声明
	            .publishOn(Schedulers.newSingle("Test2")).map(new Function<Object, Object>() {
	        @Override
	        public Object apply(Object o) {
	            System.out.println("2:" + Thread.currentThread().toString());
	            return o;
	        }
	    }).publishOn(Schedulers.newSingle("Test3")).map(new Function<Object, Object>() {
	        @Override
	        public Object apply(Object o) {
	            System.out.println("3:" + Thread.currentThread().toString());
	            return o;
	        }
	    }).publishOn(Schedulers.newSingle("Test4")).flatMap(new Function<Object, Mono<Object>>() {
	        @Override
	        public Mono<Object> apply(Object o) {
	            System.out.println("4:" + Thread.currentThread().toString());
	            return Mono.create(new Consumer<MonoSink<Object>>() {
	                @Override
	                public void accept(MonoSink<Object> objectMonoSink) {
	                    System.out.println("just:" + Thread.currentThread().toString());
	                    objectMonoSink.success(o);
	                }
	            })
	                    //上游调度线程，只生效一次，后面的都忽略掉
	                    .subscribeOn(Schedulers.newSingle("just"));
	        }
	    }).publishOn(Schedulers.newSingle("Test5")).map(new Function<Object, Object>() {
	        @Override
	        public Object apply(Object o) {
	            System.out.println("5:" + Thread.currentThread().toString());
	            return o;
	        }
	    }).publishOn(Schedulers.newSingle("Test6")).subscribe(new Consumer<Object>() {
	        @Override
	        public void accept(Object o) {
	            System.out.println("6:" + Thread.currentThread().toString());
	        }
	    });
	    synchronized (Object.class) {
	        Object.class.wait();
	    }
	}
	 
}

